http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.c b/thirdparty/paho.mqtt.c/src/MQTTPacket.c new file mode 100644 index 0000000..c21a432 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.c @@ -0,0 +1,755 @@ +/******************************************************************************* + * Copyright (c) 2009, 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs, Allan Stockdill-Mander - SSL updates + * Ian Craggs - MQTT 3.1.1 support + *******************************************************************************/ + +/** + * @file + * \brief functions to deal with reading and writing of MQTT packets from and to sockets + * + * Some other related functions are in the MQTTPacketOut module + */ + +#include "MQTTPacket.h" +#include "Log.h" +#if !defined(NO_PERSISTENCE) + #include "MQTTPersistence.h" +#endif +#include "Messages.h" +#include "StackTrace.h" + +#include <stdlib.h> +#include <string.h> + +#include "Heap.h" + +#if !defined(min) +#define min(A,B) ( (A) < (B) ? (A):(B)) +#endif + +/** + * List of the predefined MQTT v3 packet names. + */ +static const char *packet_names[] = +{ + "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", + "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", + "PINGREQ", "PINGRESP", "DISCONNECT" +}; + +const char** MQTTClient_packet_names = packet_names; + + +/** + * Converts an MQTT packet code into its name + * @param ptype packet code + * @return the corresponding string, or "UNKNOWN" + */ +const char* MQTTPacket_name(int ptype) +{ + return (ptype >= 0 && ptype <= DISCONNECT) ? packet_names[ptype] : "UNKNOWN"; +} + +/** + * Array of functions to build packets, indexed according to packet code + */ +pf new_packets[] = +{ + NULL, /**< reserved */ + NULL, /**< MQTTPacket_connect*/ + MQTTPacket_connack, /**< CONNACK */ + MQTTPacket_publish, /**< PUBLISH */ + MQTTPacket_ack, /**< PUBACK */ + MQTTPacket_ack, /**< PUBREC */ + MQTTPacket_ack, /**< PUBREL */ + MQTTPacket_ack, /**< PUBCOMP */ + NULL, /**< MQTTPacket_subscribe*/ + MQTTPacket_suback, /**< SUBACK */ + NULL, /**< MQTTPacket_unsubscribe*/ + MQTTPacket_ack, /**< UNSUBACK */ + MQTTPacket_header_only, /**< PINGREQ */ + MQTTPacket_header_only, /**< PINGRESP */ + MQTTPacket_header_only /**< DISCONNECT */ +}; + + +static char* readUTFlen(char** pptr, char* enddata, int* len); +static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net); + +/** + * Reads one MQTT packet from a socket. + * @param socket a socket from which to read an MQTT packet + * @param error pointer to the error code which is completed if no packet is returned + * @return the packet structure or NULL if there was an error + */ +void* MQTTPacket_Factory(networkHandles* net, int* error) +{ + char* data = NULL; + static Header header; + size_t remaining_length; + int ptype; + void* pack = NULL; + size_t actual_len = 0; + + FUNC_ENTRY; + *error = SOCKET_ERROR; /* indicate whether an error occurred, or not */ + + /* read the packet data from the socket */ +#if defined(OPENSSL) + *error = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, &header.byte) : Socket_getch(net->socket, &header.byte); +#else + *error = Socket_getch(net->socket, &header.byte); +#endif + if (*error != TCPSOCKET_COMPLETE) /* first byte is the header byte */ + goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */ + + /* now read the remaining length, so we know how much more to read */ + if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE) + goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */ + + /* now read the rest, the variable header and payload */ +#if defined(OPENSSL) + data = (net->ssl) ? SSLSocket_getdata(net->ssl, net->socket, remaining_length, &actual_len) : + Socket_getdata(net->socket, remaining_length, &actual_len); +#else + data = Socket_getdata(net->socket, remaining_length, &actual_len); +#endif + if (data == NULL) + { + *error = SOCKET_ERROR; + goto exit; /* socket error */ + } + + if (actual_len != remaining_length) + *error = TCPSOCKET_INTERRUPTED; + else + { + ptype = header.bits.type; + if (ptype < CONNECT || ptype > DISCONNECT || new_packets[ptype] == NULL) + Log(TRACE_MIN, 2, NULL, ptype); + else + { + if ((pack = (*new_packets[ptype])(header.byte, data, remaining_length)) == NULL) + *error = BAD_MQTT_PACKET; +#if !defined(NO_PERSISTENCE) + else if (header.bits.type == PUBLISH && header.bits.qos == 2) + { + int buf0len; + char *buf = malloc(10); + buf[0] = header.byte; + buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length); + *error = MQTTPersistence_put(net->socket, buf, buf0len, 1, + &data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1); + free(buf); + } +#endif + } + } + if (pack) + time(&(net->lastReceived)); +exit: + FUNC_EXIT_RC(*error); + return pack; +} + + +/** + * Sends an MQTT packet in one system call write + * @param socket the socket to which to write the data + * @param header the one-byte MQTT header + * @param buffer the rest of the buffer to write (not including remaining length) + * @param buflen the length of the data in buffer to be written + * @return the completion code (TCPSOCKET_COMPLETE etc) + */ +int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData) +{ + int rc; + size_t buf0len; + char *buf; + + FUNC_ENTRY; + buf = malloc(10); + buf[0] = header.byte; + buf0len = 1 + MQTTPacket_encode(&buf[1], buflen); +#if !defined(NO_PERSISTENCE) + if (header.bits.type == PUBREL) + { + char* ptraux = buffer; + int msgId = readInt(&ptraux); + rc = MQTTPersistence_put(net->socket, buf, buf0len, 1, &buffer, &buflen, + header.bits.type, msgId, 0); + } +#endif + +#if defined(OPENSSL) + if (net->ssl) + rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, 1, &buffer, &buflen, &freeData); + else +#endif + rc = Socket_putdatas(net->socket, buf, buf0len, 1, &buffer, &buflen, &freeData); + + if (rc == TCPSOCKET_COMPLETE) + time(&(net->lastSent)); + + if (rc != TCPSOCKET_INTERRUPTED) + free(buf); + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Sends an MQTT packet from multiple buffers in one system call write + * @param socket the socket to which to write the data + * @param header the one-byte MQTT header + * @param count the number of buffers + * @param buffers the rest of the buffers to write (not including remaining length) + * @param buflens the lengths of the data in the array of buffers to be written + * @return the completion code (TCPSOCKET_COMPLETE etc) + */ +int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees) +{ + int i, rc; + size_t buf0len, total = 0; + char *buf; + + FUNC_ENTRY; + buf = malloc(10); + buf[0] = header.byte; + for (i = 0; i < count; i++) + total += buflens[i]; + buf0len = 1 + MQTTPacket_encode(&buf[1], total); +#if !defined(NO_PERSISTENCE) + if (header.bits.type == PUBLISH && header.bits.qos != 0) + { /* persist PUBLISH QoS1 and Qo2 */ + char *ptraux = buffers[2]; + int msgId = readInt(&ptraux); + rc = MQTTPersistence_put(net->socket, buf, buf0len, count, buffers, buflens, + header.bits.type, msgId, 0); + } +#endif +#if defined(OPENSSL) + if (net->ssl) + rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, count, buffers, buflens, frees); + else +#endif + rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens, frees); + + if (rc == TCPSOCKET_COMPLETE) + time(&(net->lastSent)); + + if (rc != TCPSOCKET_INTERRUPTED) + free(buf); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Encodes the message length according to the MQTT algorithm + * @param buf the buffer into which the encoded data is written + * @param length the length to be encoded + * @return the number of bytes written to buffer + */ +int MQTTPacket_encode(char* buf, size_t length) +{ + int rc = 0; + + FUNC_ENTRY; + do + { + char d = length % 128; + length /= 128; + /* if there are more digits to encode, set the top bit of this digit */ + if (length > 0) + d |= 0x80; + buf[rc++] = d; + } while (length > 0); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Decodes the message length according to the MQTT algorithm + * @param socket the socket from which to read the bytes + * @param value the decoded length returned + * @return the number of bytes read from the socket + */ +int MQTTPacket_decode(networkHandles* net, size_t* value) +{ + int rc = SOCKET_ERROR; + char c; + int multiplier = 1; + int len = 0; +#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 + + FUNC_ENTRY; + *value = 0; + do + { + if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) + { + rc = SOCKET_ERROR; /* bad data */ + goto exit; + } +#if defined(OPENSSL) + rc = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, &c) : Socket_getch(net->socket, &c); +#else + rc = Socket_getch(net->socket, &c); +#endif + if (rc != TCPSOCKET_COMPLETE) + goto exit; + *value += (c & 127) * multiplier; + multiplier *= 128; + } while ((c & 128) != 0); +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Calculates an integer from two bytes read from the input buffer + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the integer value calculated + */ +int readInt(char** pptr) +{ + char* ptr = *pptr; + int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1)); + *pptr += 2; + return len; +} + + +/** + * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means + * a length delimited string. So it reads the two byte length then the data according to + * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused + * by an incorrect length. + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @param enddata pointer to the end of the buffer not to be read beyond + * @param len returns the calculcated value of the length bytes read + * @return an allocated C string holding the characters read, or NULL if the length read would + * have caused an overrun. + * + */ +static char* readUTFlen(char** pptr, char* enddata, int* len) +{ + char* string = NULL; + + FUNC_ENTRY; + if (enddata - (*pptr) > 1) /* enough length to read the integer? */ + { + *len = readInt(pptr); + if (&(*pptr)[*len] <= enddata) + { + string = malloc(*len+1); + memcpy(string, *pptr, *len); + string[*len] = '\0'; + *pptr += *len; + } + } + FUNC_EXIT; + return string; +} + + +/** + * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means + * a length delimited string. So it reads the two byte length then the data according to + * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused + * by an incorrect length. + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @param enddata pointer to the end of the buffer not to be read beyond + * @return an allocated C string holding the characters read, or NULL if the length read would + * have caused an overrun. + */ +char* readUTF(char** pptr, char* enddata) +{ + int len; + return readUTFlen(pptr, enddata, &len); +} + + +/** + * Reads one character from the input buffer. + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the character read + */ +unsigned char readChar(char** pptr) +{ + unsigned char c = **pptr; + (*pptr)++; + return c; +} + + +/** + * Writes one character to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param c the character to write + */ +void writeChar(char** pptr, char c) +{ + **pptr = c; + (*pptr)++; +} + + +/** + * Writes an integer as 2 bytes to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param anInt the integer to write + */ +void writeInt(char** pptr, int anInt) +{ + **pptr = (char)(anInt / 256); + (*pptr)++; + **pptr = (char)(anInt % 256); + (*pptr)++; +} + + +/** + * Writes a "UTF" string to an output buffer. Converts C string to length-delimited. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param string the C string to write + */ +void writeUTF(char** pptr, const char* string) +{ + size_t len = strlen(string); + writeInt(pptr, (int)len); + memcpy(*pptr, string, len); + *pptr += len; +} + + +/** + * Writes length delimited data to an output buffer + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param data the data to write + * @param datalen the length of the data to write + */ +void writeData(char** pptr, const void* data, int datalen) +{ + writeInt(pptr, datalen); + memcpy(*pptr, data, datalen); + *pptr += datalen; +} + + +/** + * Function used in the new packets table to create packets which have only a header. + * @param aHeader the MQTT header byte + * @param data the rest of the packet + * @param datalen the length of the rest of the packet + * @return pointer to the packet structure + */ +void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen) +{ + static unsigned char header = 0; + header = aHeader; + return &header; +} + + +/** + * Send an MQTT disconnect packet down a socket. + * @param socket the open socket to send the data to + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID) +{ + Header header; + int rc = 0; + + FUNC_ENTRY; + header.byte = 0; + header.bits.type = DISCONNECT; + rc = MQTTPacket_send(net, header, NULL, 0, 0); + Log(LOG_PROTOCOL, 28, NULL, net->socket, clientID, rc); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Function used in the new packets table to create publish packets. + * @param aHeader the MQTT header byte + * @param data the rest of the packet + * @param datalen the length of the rest of the packet + * @return pointer to the packet structure + */ +void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen) +{ + Publish* pack = malloc(sizeof(Publish)); + char* curdata = data; + char* enddata = &data[datalen]; + + FUNC_ENTRY; + pack->header.byte = aHeader; + if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */ + { + free(pack); + pack = NULL; + goto exit; + } + if (pack->header.bits.qos > 0) /* Msgid only exists for QoS 1 or 2 */ + pack->msgId = readInt(&curdata); + else + pack->msgId = 0; + pack->payload = curdata; + pack->payloadlen = (int)(datalen-(curdata-data)); +exit: + FUNC_EXIT; + return pack; +} + + +/** + * Free allocated storage for a publish packet. + * @param pack pointer to the publish packet structure + */ +void MQTTPacket_freePublish(Publish* pack) +{ + FUNC_ENTRY; + if (pack->topic != NULL) + free(pack->topic); + free(pack); + FUNC_EXIT; +} + + +/** + * Send an MQTT acknowledgement packet down a socket. + * @param type the MQTT packet type e.g. SUBACK + * @param msgid the MQTT message id to use + * @param dup boolean - whether to set the MQTT DUP flag + * @param net the network handle to send the data to + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net) +{ + Header header; + int rc; + char *buf = malloc(2); + char *ptr = buf; + + FUNC_ENTRY; + header.byte = 0; + header.bits.type = type; + header.bits.dup = dup; + if (type == PUBREL) + header.bits.qos = 1; + writeInt(&ptr, msgid); + if ((rc = MQTTPacket_send(net, header, buf, 2, 1)) != TCPSOCKET_INTERRUPTED) + free(buf); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Send an MQTT PUBACK packet down a socket. + * @param msgid the MQTT message id to use + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID) +{ + int rc = 0; + + FUNC_ENTRY; + rc = MQTTPacket_send_ack(PUBACK, msgid, 0, net); + Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Free allocated storage for a suback packet. + * @param pack pointer to the suback packet structure + */ +void MQTTPacket_freeSuback(Suback* pack) +{ + FUNC_ENTRY; + if (pack->qoss != NULL) + ListFree(pack->qoss); + free(pack); + FUNC_EXIT; +} + + +/** + * Send an MQTT PUBREC packet down a socket. + * @param msgid the MQTT message id to use + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID) +{ + int rc = 0; + + FUNC_ENTRY; + rc = MQTTPacket_send_ack(PUBREC, msgid, 0, net); + Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Send an MQTT PUBREL packet down a socket. + * @param msgid the MQTT message id to use + * @param dup boolean - whether to set the MQTT DUP flag + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID) +{ + int rc = 0; + + FUNC_ENTRY; + rc = MQTTPacket_send_ack(PUBREL, msgid, dup, net); + Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Send an MQTT PUBCOMP packet down a socket. + * @param msgid the MQTT message id to use + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID) +{ + int rc = 0; + + FUNC_ENTRY; + rc = MQTTPacket_send_ack(PUBCOMP, msgid, 0, net); + Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Function used in the new packets table to create acknowledgement packets. + * @param aHeader the MQTT header byte + * @param data the rest of the packet + * @param datalen the length of the rest of the packet + * @return pointer to the packet structure + */ +void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen) +{ + Ack* pack = malloc(sizeof(Ack)); + char* curdata = data; + + FUNC_ENTRY; + pack->header.byte = aHeader; + pack->msgId = readInt(&curdata); + FUNC_EXIT; + return pack; +} + + +/** + * Send an MQTT PUBLISH packet down a socket. + * @param pack a structure from which to get some values to use, e.g topic, payload + * @param dup boolean - whether to set the MQTT DUP flag + * @param qos the value to use for the MQTT QoS setting + * @param retained boolean - whether to set the MQTT retained flag + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID) +{ + Header header; + char *topiclen; + int rc = -1; + + FUNC_ENTRY; + topiclen = malloc(2); + + header.bits.type = PUBLISH; + header.bits.dup = dup; + header.bits.qos = qos; + header.bits.retain = retained; + if (qos > 0) + { + char *buf = malloc(2); + char *ptr = buf; + char* bufs[4] = {topiclen, pack->topic, buf, pack->payload}; + size_t lens[4] = {2, strlen(pack->topic), 2, pack->payloadlen}; + int frees[4] = {1, 0, 1, 0}; + + writeInt(&ptr, pack->msgId); + ptr = topiclen; + writeInt(&ptr, (int)lens[1]); + rc = MQTTPacket_sends(net, header, 4, bufs, lens, frees); + if (rc != TCPSOCKET_INTERRUPTED) + free(buf); + } + else + { + char* ptr = topiclen; + char* bufs[3] = {topiclen, pack->topic, pack->payload}; + size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen}; + int frees[3] = {1, 0, 0}; + + writeInt(&ptr, (int)lens[1]); + rc = MQTTPacket_sends(net, header, 3, bufs, lens, frees); + } + if (rc != TCPSOCKET_INTERRUPTED) + free(topiclen); + if (qos == 0) + Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc); + else + Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc, + min(20, pack->payloadlen), pack->payload); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Free allocated storage for a various packet tyoes + * @param pack pointer to the suback packet structure + */ +void MQTTPacket_free_packet(MQTTPacket* pack) +{ + FUNC_ENTRY; + if (pack->header.bits.type == PUBLISH) + MQTTPacket_freePublish((Publish*)pack); + /*else if (pack->header.type == SUBSCRIBE) + MQTTPacket_freeSubscribe((Subscribe*)pack, 1); + else if (pack->header.type == UNSUBSCRIBE) + MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/ + else + free(pack); + FUNC_EXIT; +}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.h b/thirdparty/paho.mqtt.c/src/MQTTPacket.h new file mode 100644 index 0000000..8bad955 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.h @@ -0,0 +1,262 @@ +/******************************************************************************* + * Copyright (c) 2009, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs, Allan Stockdill-Mander - SSL updates + * Ian Craggs - MQTT 3.1.1 support + * Ian Craggs - big endian Linux reversed definition + *******************************************************************************/ + +#if !defined(MQTTPACKET_H) +#define MQTTPACKET_H + +#include "Socket.h" +#if defined(OPENSSL) +#include "SSLSocket.h" +#endif +#include "LinkedList.h" +#include "Clients.h" + +/*BE +include "Socket" +include "LinkedList" +include "Clients" +BE*/ + +typedef unsigned int bool; +typedef void* (*pf)(unsigned char, char*, size_t); + +#define BAD_MQTT_PACKET -4 + +enum msgTypes +{ + CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, + PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, + PINGREQ, PINGRESP, DISCONNECT +}; + +#if defined(__linux__) +#include <endian.h> +#if __BYTE_ORDER == __BIG_ENDIAN + #define REVERSED 1 +#endif +#endif + +/** + * Bitfields for the MQTT header byte. + */ +typedef union +{ + /*unsigned*/ char byte; /**< the whole byte */ +#if defined(REVERSED) + struct + { + unsigned int type : 4; /**< message type nibble */ + bool dup : 1; /**< DUP flag bit */ + unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */ + bool retain : 1; /**< retained flag bit */ + } bits; +#else + struct + { + bool retain : 1; /**< retained flag bit */ + unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */ + bool dup : 1; /**< DUP flag bit */ + unsigned int type : 4; /**< message type nibble */ + } bits; +#endif +} Header; + + +/** + * Data for a connect packet. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ + union + { + unsigned char all; /**< all connect flags */ +#if defined(REVERSED) + struct + { + bool username : 1; /**< 3.1 user name */ + bool password : 1; /**< 3.1 password */ + bool willRetain : 1; /**< will retain setting */ + unsigned int willQoS : 2; /**< will QoS value */ + bool will : 1; /**< will flag */ + bool cleanstart : 1; /**< cleansession flag */ + int : 1; /**< unused */ + } bits; +#else + struct + { + int : 1; /**< unused */ + bool cleanstart : 1; /**< cleansession flag */ + bool will : 1; /**< will flag */ + unsigned int willQoS : 2; /**< will QoS value */ + bool willRetain : 1; /**< will retain setting */ + bool password : 1; /**< 3.1 password */ + bool username : 1; /**< 3.1 user name */ + } bits; +#endif + } flags; /**< connect flags byte */ + + char *Protocol, /**< MQTT protocol name */ + *clientID, /**< string client id */ + *willTopic, /**< will topic */ + *willMsg; /**< will payload */ + + int keepAliveTimer; /**< keepalive timeout value in seconds */ + unsigned char version; /**< MQTT version number */ +} Connect; + + +/** + * Data for a connack packet. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ + union + { + unsigned char all; /**< all connack flags */ +#if defined(REVERSED) + struct + { + unsigned int reserved : 7; /**< message type nibble */ + bool sessionPresent : 1; /**< was a session found on the server? */ + } bits; +#else + struct + { + bool sessionPresent : 1; /**< was a session found on the server? */ + unsigned int reserved : 7; /**< message type nibble */ + } bits; +#endif + } flags; /**< connack flags byte */ + char rc; /**< connack return code */ +} Connack; + + +/** + * Data for a packet with header only. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ +} MQTTPacket; + + +/** + * Data for a subscribe packet. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ + int msgId; /**< MQTT message id */ + List* topics; /**< list of topic strings */ + List* qoss; /**< list of corresponding QoSs */ + int noTopics; /**< topic and qos count */ +} Subscribe; + + +/** + * Data for a suback packet. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ + int msgId; /**< MQTT message id */ + List* qoss; /**< list of granted QoSs */ +} Suback; + + +/** + * Data for an unsubscribe packet. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ + int msgId; /**< MQTT message id */ + List* topics; /**< list of topic strings */ + int noTopics; /**< topic count */ +} Unsubscribe; + + +/** + * Data for a publish packet. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ + char* topic; /**< topic string */ + int topiclen; + int msgId; /**< MQTT message id */ + char* payload; /**< binary payload, length delimited */ + int payloadlen; /**< payload length */ +} Publish; + + +/** + * Data for one of the ack packets. + */ +typedef struct +{ + Header header; /**< MQTT header byte */ + int msgId; /**< MQTT message id */ +} Ack; + +typedef Ack Puback; +typedef Ack Pubrec; +typedef Ack Pubrel; +typedef Ack Pubcomp; +typedef Ack Unsuback; + +int MQTTPacket_encode(char* buf, size_t length); +int MQTTPacket_decode(networkHandles* net, size_t* value); +int readInt(char** pptr); +char* readUTF(char** pptr, char* enddata); +unsigned char readChar(char** pptr); +void writeChar(char** pptr, char c); +void writeInt(char** pptr, int anInt); +void writeUTF(char** pptr, const char* string); +void writeData(char** pptr, const void* data, int datalen); + +const char* MQTTPacket_name(int ptype); + +void* MQTTPacket_Factory(networkHandles* net, int* error); +int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int free); +int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees); + +void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen); +int MQTTPacket_send_disconnect(networkHandles* net, const char* clientID); + +void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen); +void MQTTPacket_freePublish(Publish* pack); +int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID); +int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID); +void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen); + +void MQTTPacket_freeSuback(Suback* pack); +int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID); +int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID); +int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID); + +void MQTTPacket_free_packet(MQTTPacket* pack); + +#if !defined(NO_BRIDGE) + #include "MQTTPacketOut.h" +#endif + +#endif /* MQTTPACKET_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c new file mode 100644 index 0000000..b924085 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c @@ -0,0 +1,269 @@ +/******************************************************************************* + * Copyright (c) 2009, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs, Allan Stockdill-Mander - SSL updates + * Ian Craggs - MQTT 3.1.1 support + * Rong Xiang, Ian Craggs - C++ compatibility + * Ian Craggs - binary password and will payload + *******************************************************************************/ + +/** + * @file + * \brief functions to deal with reading and writing of MQTT packets from and to sockets + * + * Some other related functions are in the MQTTPacket module + */ + + +#include "MQTTPacketOut.h" +#include "Log.h" +#include "StackTrace.h" + +#include <string.h> +#include <stdlib.h> + +#include "Heap.h" + + +/** + * Send an MQTT CONNECT packet down a socket. + * @param client a structure from which to get all the required values + * @param MQTTVersion the MQTT version to connect with + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_connect(Clients* client, int MQTTVersion) +{ + char *buf, *ptr; + Connect packet; + int rc = -1, len; + + FUNC_ENTRY; + packet.header.byte = 0; + packet.header.bits.type = CONNECT; + + len = ((MQTTVersion == 3) ? 12 : 10) + (int)strlen(client->clientID)+2; + if (client->will) + len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2; + if (client->username) + len += (int)strlen(client->username)+2; + if (client->password) + len += client->passwordlen+2; + + ptr = buf = malloc(len); + if (MQTTVersion == 3) + { + writeUTF(&ptr, "MQIsdp"); + writeChar(&ptr, (char)3); + } + else if (MQTTVersion == 4) + { + writeUTF(&ptr, "MQTT"); + writeChar(&ptr, (char)4); + } + else + goto exit; + + packet.flags.all = 0; + packet.flags.bits.cleanstart = client->cleansession; + packet.flags.bits.will = (client->will) ? 1 : 0; + if (packet.flags.bits.will) + { + packet.flags.bits.willQoS = client->will->qos; + packet.flags.bits.willRetain = client->will->retained; + } + + if (client->username) + packet.flags.bits.username = 1; + if (client->password) + packet.flags.bits.password = 1; + + writeChar(&ptr, packet.flags.all); + writeInt(&ptr, client->keepAliveInterval); + writeUTF(&ptr, client->clientID); + if (client->will) + { + writeUTF(&ptr, client->will->topic); + writeData(&ptr, client->will->payload, client->will->payloadlen); + } + if (client->username) + writeUTF(&ptr, client->username); + if (client->password) + writeData(&ptr, client->password, client->passwordlen); + + rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1); + Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID, client->cleansession, rc); +exit: + if (rc != TCPSOCKET_INTERRUPTED) + free(buf); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Function used in the new packets table to create connack packets. + * @param aHeader the MQTT header byte + * @param data the rest of the packet + * @param datalen the length of the rest of the packet + * @return pointer to the packet structure + */ +void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen) +{ + Connack* pack = malloc(sizeof(Connack)); + char* curdata = data; + + FUNC_ENTRY; + pack->header.byte = aHeader; + pack->flags.all = readChar(&curdata); + pack->rc = readChar(&curdata); + FUNC_EXIT; + return pack; +} + + +/** + * Send an MQTT PINGREQ packet down a socket. + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID) +{ + Header header; + int rc = 0; + size_t buflen = 0; + + FUNC_ENTRY; + header.byte = 0; + header.bits.type = PINGREQ; + rc = MQTTPacket_send(net, header, NULL, buflen,0); + Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Send an MQTT subscribe packet down a socket. + * @param topics list of topics + * @param qoss list of corresponding QoSs + * @param msgid the MQTT message id to use + * @param dup boolean - whether to set the MQTT DUP flag + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID) +{ + Header header; + char *data, *ptr; + int rc = -1; + ListElement *elem = NULL, *qosElem = NULL; + int datalen; + + FUNC_ENTRY; + header.bits.type = SUBSCRIBE; + header.bits.dup = dup; + header.bits.qos = 1; + header.bits.retain = 0; + + datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */ + while (ListNextElement(topics, &elem)) + datalen += (int)strlen((char*)(elem->content)); + ptr = data = malloc(datalen); + + writeInt(&ptr, msgid); + elem = NULL; + while (ListNextElement(topics, &elem)) + { + ListNextElement(qoss, &qosElem); + writeUTF(&ptr, (char*)(elem->content)); + writeChar(&ptr, *(int*)(qosElem->content)); + } + rc = MQTTPacket_send(net, header, data, datalen, 1); + Log(LOG_PROTOCOL, 22, NULL, net->socket, clientID, msgid, rc); + if (rc != TCPSOCKET_INTERRUPTED) + free(data); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Function used in the new packets table to create suback packets. + * @param aHeader the MQTT header byte + * @param data the rest of the packet + * @param datalen the length of the rest of the packet + * @return pointer to the packet structure + */ +void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen) +{ + Suback* pack = malloc(sizeof(Suback)); + char* curdata = data; + + FUNC_ENTRY; + pack->header.byte = aHeader; + pack->msgId = readInt(&curdata); + pack->qoss = ListInitialize(); + while ((size_t)(curdata - data) < datalen) + { + int* newint; + newint = malloc(sizeof(int)); + *newint = (int)readChar(&curdata); + ListAppend(pack->qoss, newint, sizeof(int)); + } + FUNC_EXIT; + return pack; +} + + +/** + * Send an MQTT unsubscribe packet down a socket. + * @param topics list of topics + * @param msgid the MQTT message id to use + * @param dup boolean - whether to set the MQTT DUP flag + * @param socket the open socket to send the data to + * @param clientID the string client identifier, only used for tracing + * @return the completion code (e.g. TCPSOCKET_COMPLETE) + */ +int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID) +{ + Header header; + char *data, *ptr; + int rc = -1; + ListElement *elem = NULL; + int datalen; + + FUNC_ENTRY; + header.bits.type = UNSUBSCRIBE; + header.bits.dup = dup; + header.bits.qos = 1; + header.bits.retain = 0; + + datalen = 2 + topics->count * 2; /* utf length == 2 */ + while (ListNextElement(topics, &elem)) + datalen += (int)strlen((char*)(elem->content)); + ptr = data = malloc(datalen); + + writeInt(&ptr, msgid); + elem = NULL; + while (ListNextElement(topics, &elem)) + writeUTF(&ptr, (char*)(elem->content)); + rc = MQTTPacket_send(net, header, data, datalen, 1); + Log(LOG_PROTOCOL, 25, NULL, net->socket, clientID, msgid, rc); + if (rc != TCPSOCKET_INTERRUPTED) + free(data); + FUNC_EXIT_RC(rc); + return rc; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h new file mode 100644 index 0000000..700db77 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright (c) 2009, 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs, Allan Stockdill-Mander - SSL updates + * Ian Craggs - MQTT 3.1.1 support + *******************************************************************************/ + +#if !defined(MQTTPACKETOUT_H) +#define MQTTPACKETOUT_H + +#include "MQTTPacket.h" + +int MQTTPacket_send_connect(Clients* client, int MQTTVersion); +void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen); + +int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID); + +int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID); +void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen); + +int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID); + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.c b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c new file mode 100644 index 0000000..24efb6d --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c @@ -0,0 +1,654 @@ +/******************************************************************************* + * Copyright (c) 2009, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - async client updates + * Ian Craggs - fix for bug 432903 - queue persistence + *******************************************************************************/ + +/** + * @file + * \brief Functions that apply to persistence operations. + * + */ + +#include <stdio.h> +#include <string.h> + +#include "MQTTPersistence.h" +#include "MQTTPersistenceDefault.h" +#include "MQTTProtocolClient.h" +#include "Heap.h" + + +static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen); +static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size); + +/** + * Creates a ::MQTTClient_persistence structure representing a persistence implementation. + * @param persistence the ::MQTTClient_persistence structure. + * @param type the type of the persistence implementation. See ::MQTTClient_create. + * @param pcontext the context for this persistence implementation. See ::MQTTClient_create. + * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. + */ +#include "StackTrace.h" + +int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext) +{ + int rc = 0; + MQTTClient_persistence* per = NULL; + + FUNC_ENTRY; +#if !defined(NO_PERSISTENCE) + switch (type) + { + case MQTTCLIENT_PERSISTENCE_NONE : + per = NULL; + break; + case MQTTCLIENT_PERSISTENCE_DEFAULT : + per = malloc(sizeof(MQTTClient_persistence)); + if ( per != NULL ) + { + if ( pcontext != NULL ) + { + per->context = malloc(strlen(pcontext) + 1); + strcpy(per->context, pcontext); + } + else + per->context = "."; /* working directory */ + /* file system functions */ + per->popen = pstopen; + per->pclose = pstclose; + per->pput = pstput; + per->pget = pstget; + per->premove = pstremove; + per->pkeys = pstkeys; + per->pclear = pstclear; + per->pcontainskey = pstcontainskey; + } + else + rc = MQTTCLIENT_PERSISTENCE_ERROR; + break; + case MQTTCLIENT_PERSISTENCE_USER : + per = (MQTTClient_persistence *)pcontext; + if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL || + per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL || + per->popen == NULL || per->pput == NULL || per->premove == NULL)) ) + rc = MQTTCLIENT_PERSISTENCE_ERROR; + break; + default: + rc = MQTTCLIENT_PERSISTENCE_ERROR; + break; + } +#endif + + *persistence = per; + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Open persistent store and restore any persisted messages. + * @param client the client as ::Clients. + * @param serverURI the URI of the remote end. + * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. + */ +int MQTTPersistence_initialize(Clients *c, const char *serverURI) +{ + int rc = 0; + + FUNC_ENTRY; + if ( c->persistence != NULL ) + { + rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context); + if ( rc == 0 ) + rc = MQTTPersistence_restore(c); + } + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Close persistent store. + * @param client the client as ::Clients. + * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. + */ +int MQTTPersistence_close(Clients *c) +{ + int rc =0; + + FUNC_ENTRY; + if (c->persistence != NULL) + { + rc = c->persistence->pclose(c->phandle); + c->phandle = NULL; +#if !defined(NO_PERSISTENCE) + if ( c->persistence->popen == pstopen ) + free(c->persistence); +#endif + c->persistence = NULL; + } + + FUNC_EXIT_RC(rc); + return rc; +} + +/** + * Clears the persistent store. + * @param client the client as ::Clients. + * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. + */ +int MQTTPersistence_clear(Clients *c) +{ + int rc = 0; + + FUNC_ENTRY; + if (c->persistence != NULL) + rc = c->persistence->pclear(c->phandle); + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Restores the persisted records to the outbound and inbound message queues of the + * client. + * @param client the client as ::Clients. + * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. + */ +int MQTTPersistence_restore(Clients *c) +{ + int rc = 0; + char **msgkeys = NULL, + *buffer = NULL; + int nkeys, buflen; + int i = 0; + int msgs_sent = 0; + int msgs_rcvd = 0; + + FUNC_ENTRY; + if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0) + { + while (rc == 0 && i < nkeys) + { + if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0) + { + ; + } + else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0) + { + ; + } + else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0) + { + MQTTPacket* pack = MQTTPersistence_restorePacket(buffer, buflen); + if ( pack != NULL ) + { + if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_RECEIVED) != NULL ) + { + Publish* publish = (Publish*)pack; + Messages* msg = NULL; + msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain); + msg->nextMessageType = PUBREL; + /* order does not matter for persisted received messages */ + ListAppend(c->inboundMsgs, msg, msg->len); + publish->topic = NULL; + MQTTPacket_freePublish(publish); + msgs_rcvd++; + } + else if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_SENT) != NULL ) + { + Publish* publish = (Publish*)pack; + Messages* msg = NULL; + char *key = malloc(MESSAGE_FILENAME_LENGTH + 1); + sprintf(key, "%s%d", PERSISTENCE_PUBREL, publish->msgId); + msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain); + if ( c->persistence->pcontainskey(c->phandle, key) == 0 ) + /* PUBLISH Qo2 and PUBREL sent */ + msg->nextMessageType = PUBCOMP; + /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */ + /* retry at the first opportunity */ + msg->lastTouch = 0; + MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len); + publish->topic = NULL; + MQTTPacket_freePublish(publish); + free(key); + msgs_sent++; + } + else if ( strstr(msgkeys[i],PERSISTENCE_PUBREL) != NULL ) + { + /* orphaned PUBRELs ? */ + Pubrel* pubrel = (Pubrel*)pack; + char *key = malloc(MESSAGE_FILENAME_LENGTH + 1); + sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId); + if ( c->persistence->pcontainskey(c->phandle, key) != 0 ) + rc = c->persistence->premove(c->phandle, msgkeys[i]); + free(pubrel); + free(key); + } + } + else /* pack == NULL -> bad persisted record */ + rc = c->persistence->premove(c->phandle, msgkeys[i]); + } + if (buffer) + { + free(buffer); + buffer = NULL; + } + if (msgkeys[i]) + free(msgkeys[i]); + i++; + } + if (msgkeys) + free(msgkeys); + } + Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n", + msgs_sent, msgs_rcvd, c->clientID); + MQTTPersistence_wrapMsgID(c); + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Returns a MQTT packet restored from persisted data. + * @param buffer the persisted data. + * @param buflen the number of bytes of the data buffer. + */ +void* MQTTPersistence_restorePacket(char* buffer, size_t buflen) +{ + void* pack = NULL; + Header header; + int fixed_header_length = 1, ptype, remaining_length = 0; + char c; + int multiplier = 1; + extern pf new_packets[]; + + FUNC_ENTRY; + header.byte = buffer[0]; + /* decode the message length according to the MQTT algorithm */ + do + { + c = *(++buffer); + remaining_length += (c & 127) * multiplier; + multiplier *= 128; + fixed_header_length++; + } while ((c & 128) != 0); + + if ( (fixed_header_length + remaining_length) == buflen ) + { + ptype = header.bits.type; + if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL) + pack = (*new_packets[ptype])(header.byte, ++buffer, remaining_length); + } + + FUNC_EXIT; + return pack; +} + + +/** + * Inserts the specified message into the list, maintaining message ID order. + * @param list the list to insert the message into. + * @param content the message to add. + * @param size size of the message. + */ +void MQTTPersistence_insertInOrder(List* list, void* content, size_t size) +{ + ListElement* index = NULL; + ListElement* current = NULL; + + FUNC_ENTRY; + while(ListNextElement(list, ¤t) != NULL && index == NULL) + { + if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid ) + index = current; + } + + ListInsert(list, content, size, index); + FUNC_EXIT; +} + + +/** + * Adds a record to the persistent store. This function must not be called for QoS0 + * messages. + * @param socket the socket of the client. + * @param buf0 fixed header. + * @param buf0len length of the fixed header. + * @param count number of buffers representing the variable header and/or the payload. + * @param buffers the buffers representing the variable header and/or the payload. + * @param buflens length of the buffers representing the variable header and/or the payload. + * @param msgId the message ID. + * @param scr 0 indicates message in the sending direction; 1 indicates message in the + * receiving direction. + * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. + */ +int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count, + char** buffers, size_t* buflens, int htype, int msgId, int scr ) +{ + int rc = 0; + extern ClientStates* bstate; + int nbufs, i; + int* lens = NULL; + char** bufs = NULL; + char *key; + Clients* client = NULL; + + FUNC_ENTRY; + client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content); + if (client->persistence != NULL) + { + key = malloc(MESSAGE_FILENAME_LENGTH + 1); + nbufs = 1 + count; + lens = (int *)malloc(nbufs * sizeof(int)); + bufs = (char **)malloc(nbufs * sizeof(char *)); + lens[0] = (int)buf0len; + bufs[0] = buf0; + for (i = 0; i < count; i++) + { + lens[i+1] = (int)buflens[i]; + bufs[i+1] = buffers[i]; + } + + /* key */ + if ( scr == 0 ) + { /* sending */ + if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/ + sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId); + if (htype == PUBREL) /* PUBREL */ + sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId); + } + if ( scr == 1 ) /* receiving PUBLISH QoS2 */ + sprintf(key, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId); + + rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens); + + free(key); + free(lens); + free(bufs); + } + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Deletes a record from the persistent store. + * @param client the client as ::Clients. + * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL + * or #PERSISTENCE_PUBLISH_RECEIVED. + * @param qos the qos field of the message. + * @param msgId the message ID. + * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. + */ +int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId) +{ + int rc = 0; + + FUNC_ENTRY; + if (c->persistence != NULL) + { + char *key = malloc(MESSAGE_FILENAME_LENGTH + 1); + if ( (strcmp(type,PERSISTENCE_PUBLISH_SENT) == 0) && qos == 2 ) + { + sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId) ; + rc = c->persistence->premove(c->phandle, key); + sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId) ; + rc = c->persistence->premove(c->phandle, key); + } + else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */ + { /* or PERSISTENCE_PUBLISH_RECEIVED */ + sprintf(key, "%s%d", type, msgId) ; + rc = c->persistence->premove(c->phandle, key); + } + free(key); + } + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive + * message IDs in the outboundMsgs queue. + * @param client the client as ::Clients. + */ +void MQTTPersistence_wrapMsgID(Clients *client) +{ + ListElement* wrapel = NULL; + ListElement* current = NULL; + + FUNC_ENTRY; + if ( client->outboundMsgs->count > 0 ) + { + int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid; + int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid; + int gap = MAX_MSG_ID - lastMsgID + firstMsgID; + current = ListNextElement(client->outboundMsgs, ¤t); + + while(ListNextElement(client->outboundMsgs, ¤t) != NULL) + { + int curMsgID = ((Messages*)current->content)->msgid; + int curPrevMsgID = ((Messages*)current->prev->content)->msgid; + int curgap = curMsgID - curPrevMsgID; + if ( curgap > gap ) + { + gap = curgap; + wrapel = current; + } + } + } + + if ( wrapel != NULL ) + { + /* put wrapel at the beginning of the queue */ + client->outboundMsgs->first->prev = client->outboundMsgs->last; + client->outboundMsgs->last->next = client->outboundMsgs->first; + client->outboundMsgs->first = wrapel; + client->outboundMsgs->last = wrapel->prev; + client->outboundMsgs->first->prev = NULL; + client->outboundMsgs->last->next = NULL; + } + FUNC_EXIT; +} + + +#if !defined(NO_PERSISTENCE) +int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe) +{ + int rc = 0; + char key[PERSISTENCE_MAX_KEY_LENGTH + 1]; + + FUNC_ENTRY; + sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno); + if ((rc = client->persistence->premove(client->phandle, key)) != 0) + Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc); + FUNC_EXIT_RC(rc); + return rc; +} + + +int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe) +{ + int rc = 0; + int nbufs = 8; + int bufindex = 0; + char key[PERSISTENCE_MAX_KEY_LENGTH + 1]; + int* lens = NULL; + void** bufs = NULL; + + FUNC_ENTRY; + lens = (int*)malloc(nbufs * sizeof(int)); + bufs = malloc(nbufs * sizeof(char *)); + + bufs[bufindex] = &qe->msg->payloadlen; + lens[bufindex++] = sizeof(qe->msg->payloadlen); + + bufs[bufindex] = qe->msg->payload; + lens[bufindex++] = qe->msg->payloadlen; + + bufs[bufindex] = &qe->msg->qos; + lens[bufindex++] = sizeof(qe->msg->qos); + + bufs[bufindex] = &qe->msg->retained; + lens[bufindex++] = sizeof(qe->msg->retained); + + bufs[bufindex] = &qe->msg->dup; + lens[bufindex++] = sizeof(qe->msg->dup); + + bufs[bufindex] = &qe->msg->msgid; + lens[bufindex++] = sizeof(qe->msg->msgid); + + bufs[bufindex] = qe->topicName; + lens[bufindex++] = (int)strlen(qe->topicName) + 1; + + bufs[bufindex] = &qe->topicLen; + lens[bufindex++] = sizeof(qe->topicLen); + + sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno); + qe->seqno = aclient->qentry_seqno; + + if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, (char**)bufs, lens)) != 0) + Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc); + + free(lens); + free(bufs); + + FUNC_EXIT_RC(rc); + return rc; +} + + +static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen) +{ + MQTTPersistence_qEntry* qe = NULL; + char* ptr = buffer; + int data_size; + + FUNC_ENTRY; + qe = malloc(sizeof(MQTTPersistence_qEntry)); + memset(qe, '\0', sizeof(MQTTPersistence_qEntry)); + + qe->msg = malloc(sizeof(MQTTPersistence_message)); + memset(qe->msg, '\0', sizeof(MQTTPersistence_message)); + + qe->msg->payloadlen = *(int*)ptr; + ptr += sizeof(int); + + data_size = qe->msg->payloadlen; + qe->msg->payload = malloc(data_size); + memcpy(qe->msg->payload, ptr, data_size); + ptr += data_size; + + qe->msg->qos = *(int*)ptr; + ptr += sizeof(int); + + qe->msg->retained = *(int*)ptr; + ptr += sizeof(int); + + qe->msg->dup = *(int*)ptr; + ptr += sizeof(int); + + qe->msg->msgid = *(int*)ptr; + ptr += sizeof(int); + + data_size = (int)strlen(ptr) + 1; + qe->topicName = malloc(data_size); + strcpy(qe->topicName, ptr); + ptr += data_size; + + qe->topicLen = *(int*)ptr; + ptr += sizeof(int); + + FUNC_EXIT; + return qe; +} + + +static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size) +{ + ListElement* index = NULL; + ListElement* current = NULL; + + FUNC_ENTRY; + while (ListNextElement(list, ¤t) != NULL && index == NULL) + { + if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno) + index = current; + } + ListInsert(list, qEntry, size, index); + FUNC_EXIT; +} + + +/** + * Restores a queue of messages from persistence to memory + * @param c the client as ::Clients - the client object to restore the messages to + * @return return code, 0 if successful + */ +int MQTTPersistence_restoreMessageQueue(Clients* c) +{ + int rc = 0; + char **msgkeys; + int nkeys; + int i = 0; + int entries_restored = 0; + + FUNC_ENTRY; + if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0) + { + while (rc == 0 && i < nkeys) + { + char *buffer = NULL; + int buflen; + + if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0) + { + ; + } + else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0) + { + MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen); + + if (qe) + { + qe->seqno = atoi(msgkeys[i]+2); + MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry)); + free(buffer); + c->qentry_seqno = max(c->qentry_seqno, qe->seqno); + entries_restored++; + } + } + if (msgkeys[i]) + { + free(msgkeys[i]); + } + i++; + } + if (msgkeys != NULL) + free(msgkeys); + } + Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID); + FUNC_EXIT_RC(rc); + return rc; +} +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.h b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h new file mode 100644 index 0000000..9a938ba --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h @@ -0,0 +1,74 @@ +/******************************************************************************* + * Copyright (c) 2009, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - async client updates + * Ian Craggs - fix for bug 432903 - queue persistence + *******************************************************************************/ + +#if defined(__cplusplus) + extern "C" { +#endif + +#include "Clients.h" + +/** Stem of the key for a sent PUBLISH QoS1 or QoS2 */ +#define PERSISTENCE_PUBLISH_SENT "s-" +/** Stem of the key for a sent PUBREL */ +#define PERSISTENCE_PUBREL "sc-" +/** Stem of the key for a received PUBLISH QoS2 */ +#define PERSISTENCE_PUBLISH_RECEIVED "r-" +/** Stem of the key for an async client command */ +#define PERSISTENCE_COMMAND_KEY "c-" +/** Stem of the key for an async client message queue */ +#define PERSISTENCE_QUEUE_KEY "q-" +#define PERSISTENCE_MAX_KEY_LENGTH 8 + +int MQTTPersistence_create(MQTTClient_persistence** per, int type, void* pcontext); +int MQTTPersistence_initialize(Clients* c, const char* serverURI); +int MQTTPersistence_close(Clients* c); +int MQTTPersistence_clear(Clients* c); +int MQTTPersistence_restore(Clients* c); +void* MQTTPersistence_restorePacket(char* buffer, size_t buflen); +void MQTTPersistence_insertInOrder(List* list, void* content, size_t size); +int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count, + char** buffers, size_t* buflens, int htype, int msgId, int scr); +int MQTTPersistence_remove(Clients* c, char* type, int qos, int msgId); +void MQTTPersistence_wrapMsgID(Clients *c); + +typedef struct +{ + char struct_id[4]; + int struct_version; + int payloadlen; + void* payload; + int qos; + int retained; + int dup; + int msgid; +} MQTTPersistence_message; + +typedef struct +{ + MQTTPersistence_message* msg; + char* topicName; + int topicLen; + unsigned int seqno; /* only used on restore */ +} MQTTPersistence_qEntry; + +int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe); +int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe); +int MQTTPersistence_restoreMessageQueue(Clients* c); +#ifdef __cplusplus + } +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c new file mode 100644 index 0000000..35c1f53 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c @@ -0,0 +1,841 @@ +/******************************************************************************* + * Copyright (c) 2009, 2016 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - async client updates + * Ian Craggs - fix for bug 484496 + *******************************************************************************/ + +/** + * @file + * \brief A file system based persistence implementation. + * + * A directory is specified when the MQTT client is created. When the persistence is then + * opened (see ::Persistence_open), a sub-directory is made beneath the base for this + * particular client ID and connection key. This allows one persistence base directory to + * be shared by multiple clients. + * + */ + +#if !defined(NO_PERSISTENCE) + +#include "OsWrapper.h" + +#include <stdio.h> +#include <string.h> +#include <errno.h> + +#if defined(WIN32) || defined(WIN64) + #include <direct.h> + /* Windows doesn't have strtok_r, so remap it to strtok */ + #define strtok_r( A, B, C ) strtok( A, B ) + int keysWin32(char *, char ***, int *); + int clearWin32(char *); + int containskeyWin32(char *, char *); +#else + #include <sys/stat.h> + #include <dirent.h> + #include <unistd.h> + int keysUnix(char *, char ***, int *); + int clearUnix(char *); + int containskeyUnix(char *, char *); +#endif + +#include "MQTTClientPersistence.h" +#include "MQTTPersistenceDefault.h" +#include "StackTrace.h" +#include "Heap.h" + +/** Create persistence directory for the client: context/clientID-serverURI. + * See ::Persistence_open + */ + +int pstopen(void **handle, const char* clientID, const char* serverURI, void* context) +{ + int rc = 0; + char *dataDir = context; + char *clientDir; + char *pToken = NULL; + char *save_ptr = NULL; + char *pCrtDirName = NULL; + char *pTokDirName = NULL; + char *perserverURI = NULL, *ptraux; + + FUNC_ENTRY; + /* Note that serverURI=address:port, but ":" not allowed in Windows directories */ + perserverURI = malloc(strlen(serverURI) + 1); + strcpy(perserverURI, serverURI); + while ((ptraux = strstr(perserverURI, ":")) != NULL) + *ptraux = '-' ; + + /* consider '/' + '-' + '\0' */ + clientDir = malloc(strlen(dataDir) + strlen(clientID) + strlen(perserverURI) + 3); + sprintf(clientDir, "%s/%s-%s", dataDir, clientID, perserverURI); + + + /* create clientDir directory */ + + /* pCrtDirName - holds the directory name we are currently trying to create. */ + /* This gets built up level by level until the full path name is created.*/ + /* pTokDirName - holds the directory name that gets used by strtok. */ + pCrtDirName = (char*)malloc( strlen(clientDir) + 1 ); + pTokDirName = (char*)malloc( strlen(clientDir) + 1 ); + strcpy( pTokDirName, clientDir ); + + pToken = strtok_r( pTokDirName, "\\/", &save_ptr ); + + strcpy( pCrtDirName, pToken ); + rc = pstmkdir( pCrtDirName ); + pToken = strtok_r( NULL, "\\/", &save_ptr ); + while ( (pToken != NULL) && (rc == 0) ) + { + /* Append the next directory level and try to create it */ + strcat( pCrtDirName, "/" ); + strcat( pCrtDirName, pToken ); + rc = pstmkdir( pCrtDirName ); + pToken = strtok_r( NULL, "\\/", &save_ptr ); + } + + *handle = clientDir; + + free(pTokDirName); + free(pCrtDirName); + free(perserverURI); + + FUNC_EXIT_RC(rc); + return rc; +} + +/** Function to create a directory. + * Returns 0 on success or if the directory already exists. + */ +int pstmkdir( char *pPathname ) +{ + int rc = 0; + + FUNC_ENTRY; +#if defined(WIN32) || defined(WIN64) + if ( _mkdir( pPathname ) != 0 ) + { +#else + /* Create a directory with read, write and execute access for the owner and read access for the group */ +#if !defined(_WRS_KERNEL) + if ( mkdir( pPathname, S_IRWXU | S_IRGRP ) != 0 ) +#else + if ( mkdir( pPathname ) != 0 ) +#endif /* !defined(_WRS_KERNEL) */ + { +#endif + if ( errno != EEXIST ) + rc = MQTTCLIENT_PERSISTENCE_ERROR; + } + + FUNC_EXIT_RC(rc); + return rc; +} + + + +/** Write wire message to the client persistence directory. + * See ::Persistence_put + */ +int pstput(void* handle, char* key, int bufcount, char* buffers[], int buflens[]) +{ + int rc = 0; + char *clientDir = handle; + char *file; + FILE *fp; + size_t bytesWritten = 0, + bytesTotal = 0; + int i; + + FUNC_ENTRY; + if (clientDir == NULL) + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + + /* consider '/' + '\0' */ + file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2 ); + sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION); + + fp = fopen(file, "wb"); + if ( fp != NULL ) + { + for(i=0; i<bufcount; i++) + { + bytesTotal += buflens[i]; + bytesWritten += fwrite(buffers[i], sizeof(char), buflens[i], fp ); + } + fclose(fp); + fp = NULL; + } else + rc = MQTTCLIENT_PERSISTENCE_ERROR; + + if (bytesWritten != bytesTotal) + { + pstremove(handle, key); + rc = MQTTCLIENT_PERSISTENCE_ERROR; + } + + free(file); + +exit: + FUNC_EXIT_RC(rc); + return rc; +}; + + +/** Retrieve a wire message from the client persistence directory. + * See ::Persistence_get + */ +int pstget(void* handle, char* key, char** buffer, int* buflen) +{ + int rc = 0; + FILE *fp; + char *clientDir = handle; + char *file; + char *buf; + unsigned long fileLen = 0; + unsigned long bytesRead = 0; + + FUNC_ENTRY; + if (clientDir == NULL) + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + + /* consider '/' + '\0' */ + file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2); + sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION); + + fp = fopen(file, "rb"); + if ( fp != NULL ) + { + fseek(fp, 0, SEEK_END); + fileLen = ftell(fp); + fseek(fp, 0, SEEK_SET); + buf=(char *)malloc(fileLen); + bytesRead = (int)fread(buf, sizeof(char), fileLen, fp); + *buffer = buf; + *buflen = bytesRead; + if ( bytesRead != fileLen ) + rc = MQTTCLIENT_PERSISTENCE_ERROR; + fclose(fp); + fp = NULL; + } else + rc = MQTTCLIENT_PERSISTENCE_ERROR; + + free(file); + /* the caller must free buf */ + +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + + +/** Delete a persisted message from the client persistence directory. + * See ::Persistence_remove + */ +int pstremove(void* handle, char* key) +{ + int rc = 0; + char *clientDir = handle; + char *file; + + FUNC_ENTRY; + if (clientDir == NULL) + { + return rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + + /* consider '/' + '\0' */ + file = malloc(strlen(clientDir) + strlen(key) + strlen(MESSAGE_FILENAME_EXTENSION) + 2); + sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION); + +#if defined(WIN32) || defined(WIN64) + if ( _unlink(file) != 0 ) + { +#else + if ( unlink(file) != 0 ) + { +#endif + if ( errno != ENOENT ) + rc = MQTTCLIENT_PERSISTENCE_ERROR; + } + + free(file); + +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** Delete client persistence directory (if empty). + * See ::Persistence_close + */ +int pstclose(void* handle) +{ + int rc = 0; + char *clientDir = handle; + + FUNC_ENTRY; + if (clientDir == NULL) + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + +#if defined(WIN32) || defined(WIN64) + if ( _rmdir(clientDir) != 0 ) + { +#else + if ( rmdir(clientDir) != 0 ) + { +#endif + if ( errno != ENOENT && errno != ENOTEMPTY ) + rc = MQTTCLIENT_PERSISTENCE_ERROR; + } + + free(clientDir); + +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** Returns whether if a wire message is persisted in the client persistence directory. + * See ::Persistence_containskey + */ +int pstcontainskey(void *handle, char *key) +{ + int rc = 0; + char *clientDir = handle; + + FUNC_ENTRY; + if (clientDir == NULL) + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + +#if defined(WIN32) || defined(WIN64) + rc = containskeyWin32(clientDir, key); +#else + rc = containskeyUnix(clientDir, key); +#endif + +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +#if defined(WIN32) || defined(WIN64) +int containskeyWin32(char *dirname, char *key) +{ + int notFound = MQTTCLIENT_PERSISTENCE_ERROR; + int fFinished = 0; + char *filekey, *ptraux; + char dir[MAX_PATH+1]; + WIN32_FIND_DATAA FileData; + HANDLE hDir; + + FUNC_ENTRY; + sprintf(dir, "%s/*", dirname); + + hDir = FindFirstFileA(dir, &FileData); + if (hDir != INVALID_HANDLE_VALUE) + { + while (!fFinished) + { + if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE) + { + filekey = malloc(strlen(FileData.cFileName) + 1); + strcpy(filekey, FileData.cFileName); + ptraux = strstr(filekey, MESSAGE_FILENAME_EXTENSION); + if ( ptraux != NULL ) + *ptraux = '\0' ; + if(strcmp(filekey, key) == 0) + { + notFound = 0; + fFinished = 1; + } + free(filekey); + } + if (!FindNextFileA(hDir, &FileData)) + { + if (GetLastError() == ERROR_NO_MORE_FILES) + fFinished = 1; + } + } + FindClose(hDir); + } + + FUNC_EXIT_RC(notFound); + return notFound; +} +#else +int containskeyUnix(char *dirname, char *key) +{ + int notFound = MQTTCLIENT_PERSISTENCE_ERROR; + char *filekey, *ptraux; + DIR *dp; + struct dirent *dir_entry; + struct stat stat_info; + + FUNC_ENTRY; + if((dp = opendir(dirname)) != NULL) + { + while((dir_entry = readdir(dp)) != NULL && notFound) + { + char* filename = malloc(strlen(dirname) + strlen(dir_entry->d_name) + 2); + sprintf(filename, "%s/%s", dirname, dir_entry->d_name); + lstat(filename, &stat_info); + free(filename); + if(S_ISREG(stat_info.st_mode)) + { + filekey = malloc(strlen(dir_entry->d_name) + 1); + strcpy(filekey, dir_entry->d_name); + ptraux = strstr(filekey, MESSAGE_FILENAME_EXTENSION); + if ( ptraux != NULL ) + *ptraux = '\0' ; + if(strcmp(filekey, key) == 0) + notFound = 0; + free(filekey); + } + } + closedir(dp); + } + + FUNC_EXIT_RC(notFound); + return notFound; +} +#endif + + +/** Delete all the persisted message in the client persistence directory. + * See ::Persistence_clear + */ +int pstclear(void *handle) +{ + int rc = 0; + char *clientDir = handle; + + FUNC_ENTRY; + if (clientDir == NULL) + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + +#if defined(WIN32) || defined(WIN64) + rc = clearWin32(clientDir); +#else + rc = clearUnix(clientDir); +#endif + +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +#if defined(WIN32) || defined(WIN64) +int clearWin32(char *dirname) +{ + int rc = 0; + int fFinished = 0; + char *file; + char dir[MAX_PATH+1]; + WIN32_FIND_DATAA FileData; + HANDLE hDir; + + FUNC_ENTRY; + sprintf(dir, "%s/*", dirname); + + hDir = FindFirstFileA(dir, &FileData); + if (hDir != INVALID_HANDLE_VALUE) + { + while (!fFinished) + { + if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE) + { + file = malloc(strlen(dirname) + strlen(FileData.cFileName) + 2); + sprintf(file, "%s/%s", dirname, FileData.cFileName); + rc = remove(file); + free(file); + if ( rc != 0 ) + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + break; + } + } + if (!FindNextFileA(hDir, &FileData)) + { + if (GetLastError() == ERROR_NO_MORE_FILES) + fFinished = 1; + } + } + FindClose(hDir); + } else + rc = MQTTCLIENT_PERSISTENCE_ERROR; + + FUNC_EXIT_RC(rc); + return rc; +} +#else +int clearUnix(char *dirname) +{ + int rc = 0; + DIR *dp; + struct dirent *dir_entry; + struct stat stat_info; + + FUNC_ENTRY; + if((dp = opendir(dirname)) != NULL) + { + while((dir_entry = readdir(dp)) != NULL && rc == 0) + { + lstat(dir_entry->d_name, &stat_info); + if(S_ISREG(stat_info.st_mode)) + { + if ( remove(dir_entry->d_name) != 0 ) + rc = MQTTCLIENT_PERSISTENCE_ERROR; + } + } + closedir(dp); + } else + rc = MQTTCLIENT_PERSISTENCE_ERROR; + + FUNC_EXIT_RC(rc); + return rc; +} +#endif + + +/** Returns the keys (file names w/o the extension) in the client persistence directory. + * See ::Persistence_keys + */ +int pstkeys(void *handle, char ***keys, int *nkeys) +{ + int rc = 0; + char *clientDir = handle; + + FUNC_ENTRY; + if (clientDir == NULL) + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + +#if defined(WIN32) || defined(WIN64) + rc = keysWin32(clientDir, keys, nkeys); +#else + rc = keysUnix(clientDir, keys, nkeys); +#endif + +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +#if defined(WIN32) || defined(WIN64) +int keysWin32(char *dirname, char ***keys, int *nkeys) +{ + int rc = 0; + char **fkeys = NULL; + int nfkeys = 0; + char dir[MAX_PATH+1]; + WIN32_FIND_DATAA FileData; + HANDLE hDir; + int fFinished = 0; + char *ptraux; + int i; + + FUNC_ENTRY; + sprintf(dir, "%s/*", dirname); + + /* get number of keys */ + hDir = FindFirstFileA(dir, &FileData); + if (hDir != INVALID_HANDLE_VALUE) + { + while (!fFinished) + { + if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE) + nfkeys++; + if (!FindNextFileA(hDir, &FileData)) + { + if (GetLastError() == ERROR_NO_MORE_FILES) + fFinished = 1; + } + } + FindClose(hDir); + } else + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + + if (nfkeys != 0 ) + fkeys = (char **)malloc(nfkeys * sizeof(char *)); + + /* copy the keys */ + hDir = FindFirstFileA(dir, &FileData); + if (hDir != INVALID_HANDLE_VALUE) + { + fFinished = 0; + i = 0; + while (!fFinished) + { + if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE) + { + fkeys[i] = malloc(strlen(FileData.cFileName) + 1); + strcpy(fkeys[i], FileData.cFileName); + ptraux = strstr(fkeys[i], MESSAGE_FILENAME_EXTENSION); + if ( ptraux != NULL ) + *ptraux = '\0' ; + i++; + } + if (!FindNextFileA(hDir, &FileData)) + { + if (GetLastError() == ERROR_NO_MORE_FILES) + fFinished = 1; + } + } + FindClose(hDir); + } else + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + + *nkeys = nfkeys; + *keys = fkeys; + /* the caller must free keys */ + +exit: + FUNC_EXIT_RC(rc); + return rc; +} +#else +int keysUnix(char *dirname, char ***keys, int *nkeys) +{ + int rc = 0; + char **fkeys = NULL; + int nfkeys = 0; + char *ptraux; + int i; + DIR *dp; + struct dirent *dir_entry; + struct stat stat_info; + + FUNC_ENTRY; + /* get number of keys */ + if((dp = opendir(dirname)) != NULL) + { + while((dir_entry = readdir(dp)) != NULL) + { + char* temp = malloc(strlen(dirname)+strlen(dir_entry->d_name)+2); + + sprintf(temp, "%s/%s", dirname, dir_entry->d_name); + if (lstat(temp, &stat_info) == 0 && S_ISREG(stat_info.st_mode)) + nfkeys++; + free(temp); + } + closedir(dp); + } else + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + + if (nfkeys != 0) + { + fkeys = (char **)malloc(nfkeys * sizeof(char *)); + + /* copy the keys */ + if((dp = opendir(dirname)) != NULL) + { + i = 0; + while((dir_entry = readdir(dp)) != NULL) + { + char* temp = malloc(strlen(dirname)+strlen(dir_entry->d_name)+2); + + sprintf(temp, "%s/%s", dirname, dir_entry->d_name); + if (lstat(temp, &stat_info) == 0 && S_ISREG(stat_info.st_mode)) + { + fkeys[i] = malloc(strlen(dir_entry->d_name) + 1); + strcpy(fkeys[i], dir_entry->d_name); + ptraux = strstr(fkeys[i], MESSAGE_FILENAME_EXTENSION); + if ( ptraux != NULL ) + *ptraux = '\0' ; + i++; + } + free(temp); + } + closedir(dp); + } else + { + rc = MQTTCLIENT_PERSISTENCE_ERROR; + goto exit; + } + } + + *nkeys = nfkeys; + *keys = fkeys; + /* the caller must free keys */ + +exit: + FUNC_EXIT_RC(rc); + return rc; +} +#endif + + + +#if defined(UNIT_TESTS) +int main (int argc, char *argv[]) +{ +#define MSTEM "m-" +#define NMSGS 10 +#define NBUFS 4 +#define NDEL 2 +#define RC !rc ? "(Success)" : "(Failed) " + + int rc; + char *handle; + char *perdir = "."; + const char *clientID = "TheUTClient"; + const char *serverURI = "127.0.0.1:1883"; + + char *stem = MSTEM; + int msgId, i; + int nm[NDEL] = {5 , 8}; /* msgIds to get and remove */ + + char *key; + char **keys; + int nkeys; + char *buffer, *buff; + int buflen; + + int nbufs = NBUFS; + char *bufs[NBUFS] = {"m0", "mm1", "mmm2" , "mmmm3"}; /* message content */ + int buflens[NBUFS]; + for(i=0;i<nbufs;i++) + buflens[i]=strlen(bufs[i]); + + /* open */ + /* printf("Persistence directory : %s\n", perdir); */ + rc = pstopen((void**)&handle, clientID, serverURI, perdir); + printf("%s Persistence directory for client %s : %s\n", RC, clientID, handle); + + /* put */ + for(msgId=0;msgId<NMSGS;msgId++) + { + key = malloc(MESSAGE_FILENAME_LENGTH + 1); + sprintf(key, "%s%d", stem, msgId); + rc = pstput(handle, key, nbufs, bufs, buflens); + printf("%s Adding message %s\n", RC, key); + free(key); + } + + /* keys ,ie, list keys added */ + rc = pstkeys(handle, &keys, &nkeys); + printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle); + for(i=0;i<nkeys;i++) + printf("%13s\n", keys[i]); + + if (keys !=NULL) + free(keys); + + /* containskey */ + for(i=0;i<NDEL;i++) + { + key = malloc(MESSAGE_FILENAME_LENGTH + 1); + sprintf(key, "%s%d", stem, nm[i]); + rc = pstcontainskey(handle, key); + printf("%s Message %s is persisted ?\n", RC, key); + free(key); + } + + /* get && remove*/ + for(i=0;i<NDEL;i++) + { + key = malloc(MESSAGE_FILENAME_LENGTH + 1); + sprintf(key, "%s%d", stem, nm[i]); + rc = pstget(handle, key, &buffer, &buflen); + buff = malloc(buflen+1); + memcpy(buff, buffer, buflen); + buff[buflen] = '\0'; + printf("%s Retrieving message %s : %s\n", RC, key, buff); + rc = pstremove(handle, key); + printf("%s Removing message %s\n", RC, key); + free(key); + free(buff); + free(buffer); + } + + /* containskey */ + for(i=0;i<NDEL;i++) + { + key = malloc(MESSAGE_FILENAME_LENGTH + 1); + sprintf(key, "%s%d", stem, nm[i]); + rc = pstcontainskey(handle, key); + printf("%s Message %s is persisted ?\n", RC, key); + free(key); + } + + /* keys ,ie, list keys added */ + rc = pstkeys(handle, &keys, &nkeys); + printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle); + for(i=0;i<nkeys;i++) + printf("%13s\n", keys[i]); + + if (keys != NULL) + free(keys); + + + /* close -> it will fail, since client persistence directory is not empty */ + rc = pstclose(&handle); + printf("%s Closing client persistence directory for client %s\n", RC, clientID); + + /* clear */ + rc = pstclear(handle); + printf("%s Deleting all persisted messages in %s\n", RC, handle); + + /* keys ,ie, list keys added */ + rc = pstkeys(handle, &keys, &nkeys); + printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle); + for(i=0;i<nkeys;i++) + printf("%13s\n", keys[i]); + + if ( keys != NULL ) + free(keys); + + /* close */ + rc = pstclose(&handle); + printf("%s Closing client persistence directory for client %s\n", RC, clientID); +} +#endif + + +#endif /* NO_PERSISTENCE */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h new file mode 100644 index 0000000..27fedd6 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h @@ -0,0 +1,33 @@ +/******************************************************************************* + * Copyright (c) 2009, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +/** 8.3 filesystem */ +#define MESSAGE_FILENAME_LENGTH 8 +/** Extension of the filename */ +#define MESSAGE_FILENAME_EXTENSION ".msg" + +/* prototypes of the functions for the default file system persistence */ +int pstopen(void** handle, const char* clientID, const char* serverURI, void* context); +int pstclose(void* handle); +int pstput(void* handle, char* key, int bufcount, char* buffers[], int buflens[]); +int pstget(void* handle, char* key, char** buffer, int* buflen); +int pstremove(void* handle, char* key); +int pstkeys(void* handle, char*** keys, int* nkeys); +int pstclear(void* handle); +int pstcontainskey(void* handle, char* key); + +int pstmkdir(char *pPathname); + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocol.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocol.h b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h new file mode 100644 index 0000000..7478103 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h @@ -0,0 +1,46 @@ +/******************************************************************************* + * Copyright (c) 2009, 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - MQTT 3.1.1 updates + *******************************************************************************/ + +#if !defined(MQTTPROTOCOL_H) +#define MQTTPROTOCOL_H + +#include "LinkedList.h" +#include "MQTTPacket.h" +#include "Clients.h" + +#define MAX_MSG_ID 65535 +#define MAX_CLIENTID_LEN 65535 + +typedef struct +{ + int socket; + Publications* p; +} pending_write; + + +typedef struct +{ + List publications; + unsigned int msgs_received; + unsigned int msgs_sent; + List pending_writes; /* for qos 0 writes not complete */ +} MQTTProtocol; + + +#include "MQTTProtocolOut.h" + +#endif
